package com.yxsk.relay.job.component.admin.monitor;

import com.alibaba.fastjson.TypeReference;
import com.yxsk.relay.job.component.admin.exception.RelayJobAdminRuntimeException;
import com.yxsk.relay.job.component.admin.monitor.callback.MonitorEventCallback;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.remote.RemoteCallException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.JobExecuteResponse;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.query.AsyncJobQueryRequest;
import com.yxsk.relay.job.component.common.protocol.message.execute.async.query.AsyncJobQueryResponse;
import com.yxsk.relay.job.component.common.thread.WakeupAbleTask;
import com.yxsk.relay.job.component.common.thread.observe.ObservableThread;
import com.yxsk.relay.job.component.common.utils.Assert;
import com.yxsk.relay.job.component.common.utils.CollectionUtils;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.utils.SerialNoUtils;
import com.yxsk.relay.job.component.common.vo.ExecuteResult;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @Author 11376
 * @CreaTime 2019/6/17 22:22
 * @Description
 */
@Slf4j
public class DefaultAsyncJobMonitor extends WakeupAbleTask implements AsyncJobStatusMonitor {

    private long interval = TimeUnit.MILLISECONDS.convert(3, TimeUnit.SECONDS);

    // 监控列表 key: jobId, value: 监控信息
    private Map<String, MonitorJobInfo> monitorMap;

    // 等待监控列表
    private Map<String, MonitorJobInfo> waitMonitorMap;

    private RemoteCaller remoteCaller;

    public DefaultAsyncJobMonitor(RemoteCaller remoteCaller) {
        this.remoteCaller = remoteCaller;
        this.monitorMap = new ConcurrentHashMap<>(16, 0.75f);
        this.waitMonitorMap = new ConcurrentHashMap<>(8, 0.75f);
    }

    @Override
    public void start() {
        this.reloadExecutingJob();
        new ObservableThread<>(this).start();
    }

    @Override
    public void reloadExecutingJob() {
        // Nothing.
    }

    @Override
    public void stop() {
        this.wakeup();
        try {
            this.workThread.join();
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting for the worker thread to end.", e);
        }
    }

    @Override
    public void monitor(MonitorRequest monitorRequest) {
        this.monitor(monitorRequest, true);
    }

    @Override
    public void monitor(MonitorRequest monitorRequest, boolean soon) {
        this.monitor(monitorRequest, -1L, soon);
    }

    @Override
    public void monitor(MonitorRequest monitorRequest, long executeTimeout) {
        this.monitor(monitorRequest, executeTimeout, true);
    }

    @Override
    public void monitor(MonitorRequest monitorRequest, long executeTimeout, boolean soon) {
        // 提交监控时
        Assert.notNull(monitorRequest, "Monitor request not be null");
        MonitorJobInfo info = new MonitorJobInfo();
        info.setHost(monitorRequest.getHost());
        info.setPort(monitorRequest.getPort());
        info.setToken(monitorRequest.getToken());
        info.setExecuteJobId(monitorRequest.getExecuteJobId());
        info.setResultCallback(monitorRequest.getResultCallback());
        info.setTimeout(executeTimeout);
        info.setRequestTime(DateUtils.getCurrentTimeMillis());
        if (executeTimeout > 0L) {
            info.setTimeout(executeTimeout);
        }
        info.setProcessed(false);
        info.setMonitor(soon);
        if (soon) {
            // 立即提交监控
            this.monitorMap.put(monitorRequest.getExecuteJobId(), info);
        } else {
            this.waitMonitorMap.put(monitorRequest.getExecuteJobId(), info);
        }

        if (log.isDebugEnabled()) {
            log.debug("Push monitor information success, job id: {}, soon: {}", monitorRequest.getExecuteJobId(), soon);
        }
    }

    @Override
    public void startMonitor(String jobId) {
        MonitorJobInfo info = this.waitMonitorMap.get(jobId);
        if (info == null) {
            throw new RelayJobAdminRuntimeException("No task monitoring information found, job id: " + jobId);
        }
        synchronized (info) {
            if (info.getProcessed()) {
                // 已处理
                this.waitMonitorMap.remove(jobId);
                if (log.isDebugEnabled()) {
                    log.debug("Monitor information processed, remove wait monitor queue, job id: {}", jobId);
                }
            } else {
                // 移交监控列表
                info.setRequestTime(DateUtils.getCurrentTimeMillis());
                this.monitorMap.put(jobId, info);
                this.waitMonitorMap.remove(jobId);
                if (log.isDebugEnabled()) {
                    log.debug("Handover monitoring queue success, job id: {}", jobId);
                }
            }
        }
    }

    @Override
    public void giveUpMonitor(String jobId) {
        this.waitMonitorMap.remove(jobId);
        this.monitorMap.remove(jobId);
    }

    @Override
    public void repealMonitor(String jobId) {
        Assert.hasLength(jobId, "Job id must not empty");
        MonitorRequest request = this.monitorMap.get(jobId);
        if (request != null && !CollectionUtils.isEmpty(request.getChildrenJob())) {
            // 先撤销其子任务监控
            request.getChildrenJob().stream().forEach(monitorRequest -> this.repealMonitor(jobId));
        }
        this.monitorMap.remove(jobId);
    }

    @Override
    public void doFinishCallback(JobExecuteCallbackInfo callbackInfo) {
        Assert.notNull(callbackInfo, "Callback request not be null");

        if (log.isDebugEnabled()) {
            log.debug("Receive asynchronous execution callback, callback information: {}", callbackInfo);
        }
        // 任务完成回调
        JobExecuteResponse response = new JobExecuteResponse();
        response.setExecuteStatus(callbackInfo.getResultCode());
        response.setResult(callbackInfo.getResult());
        response.setRespTime(DateUtils.getCurrentDate());
        response.setErrorMsg(callbackInfo.getErrorMsg());

        this.syncCallback(callbackInfo.getJobId(), response);
        // throw new JobMonitorException("Not found monitor job information, job id: " + callbackInfo.getJobId());
    }

    private void syncCallback(String jobId, JobExecuteResponse response) {
        MonitorJobInfo info = this.monitorMap.get(jobId);
        if (info == null) {
            info = this.waitMonitorMap.get(jobId);
            if (info == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Not found monitor information of job id: {}", jobId);
                }
                return;
            }
        }
        synchronized (info) {
            // 再次检查
            MonitorJobInfo waitMonitorInfo = this.waitMonitorMap.get(jobId);
            if (this.monitorMap.get(jobId) == null && waitMonitorInfo == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Synchronized handle not found monitor information of job id: {}", jobId);
                }
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("Synchronized handle job execute result. \n >>>> monitor information: {}\n #### response: {}", info, response);
            }
            MonitorEventCallback callback = info.getResultCallback();
            if (callback != null) {
                if (ExecuteResult.ExecuteResultCode.OK.getCode().equals(response.getExecuteStatus())) {
                    callback.onFinish(jobId, response);
                } else {
                    callback.onError(jobId, new RemoteCallException(response.getErrorMsg()));
                }
            }
            // 移除监控列表
            this.monitorMap.remove(jobId);
            if (waitMonitorInfo != null) {
                waitMonitorInfo.setProcessed(true);
            }
        }
    }

    @Override
    protected Object doWork() {
        while (!this.isInterrupted()) {
            LinkedList<String> jobIds = new LinkedList<>(this.monitorMap.keySet());
            if (!CollectionUtils.isEmpty(jobIds)) {
                jobIds.stream().forEach(this::doMonitor);
            }
            try {
                Thread.sleep(this.interval);
            } catch (InterruptedException e) {
                log.warn("Task is interrupted.");
            }
        }
        return "Asynchronous job monitoring service stopped.";
    }

    private void doMonitor(String jobId) {
        boolean complete = queryResult(jobId);
        // 查询失败时检查是否超时
        if (!complete) {
            // 检查是否超时
            MonitorJobInfo info = this.monitorMap.get(jobId);
            if (info != null && isTimeout(info)) {
                // 超时
                /*String errorMsg = MessageFormat.format("Job execute timeout, execute host[{0}], port[{1}], execute time[{2}], jobId[{3}]",
                        info.getHost(),
                        info.getPort(),
                        DateUtils.formatDate(info.getRequestTime(), "yyyy-MM-dd HH:mm:ss zzz"),
                        info.getExecuteJobId());*/

                info.getResultCallback().onExecuteTimeout(info.getExecuteJobId());

                // 移除监控列表
                this.monitorMap.remove(jobId);
            }
        }

    }

    private boolean queryResult(String jobId) {
        // 实例化请求
        MonitorJobInfo info = this.monitorMap.get(jobId);
        // 有可能此时已经主动通知
        if (info != null) {
            AsyncJobQueryRequest request = new AsyncJobQueryRequest();
            request.setJobId(jobId);
            request.setRequestTime(DateUtils.getCurrentDate());
            request.setVersion(UriConstant.VERSION_NO);
            request.setSerialNo(SerialNoUtils.nextId());
            try {
                Map<String, String> header = null;
                if (StringUtils.isNotEmpty(info.getToken())) {
                    header = new HashMap<>(2);
                    header.put(UriConstant.AUTHORIZATION_HEADER_KEY, info.getToken());
                }
                ResultResponse<AsyncJobQueryResponse> resp = this.remoteCaller.call(request, header, this.remoteCaller.buildUri(info.getHost(), info.getPort(), UriConstant.TASK_ASYNC_EXECUTE_RESULT_QUERY_URI), new TypeReference<ResultResponse<AsyncJobQueryResponse>>() {
                });
                return handleResponse(info, resp);
            } catch (Exception e) {
                log.error("Query task execute result error.", e);
            }
        }
        // 未在列表中找到监控信息则返回成功
        return true;
    }

    /**
     * @param info
     * @Author 11376
     * @Description 处理查询结果
     * @CreateTime 2019/6/19 20:03
     * @Return
     */
    private boolean handleResponse(MonitorJobInfo info, ResultResponse<AsyncJobQueryResponse> resultResponse) {
        AsyncJobQueryResponse data = resultResponse.getData();
        if (ResultResponse.isOk(resultResponse)) {
            if (ExecuteResult.ExecuteResultCode.OK.getCode().equals(data.getResultCode())) {
                // 执行成功
                JobExecuteResponse response = new JobExecuteResponse();
                response.setExecuteStatus(data.getResultCode());
                response.setRespTime(data.getRespTime());
                response.setResult(data.getResult());
                this.syncCallback(info.getExecuteJobId(), response);
                return true;
            } else if (ExecuteResult.ExecuteResultCode.ERROR.getCode().equals(data.getResultCode()) ||
                    ExecuteResult.ExecuteResultCode.JOB_NOT_FOUND.getCode().equals(data.getResultCode())) {
                // 执行失败
                JobExecuteResponse response = new JobExecuteResponse();
                response.setExecuteStatus(data.getResultCode());
                response.setRespTime(data.getRespTime());
                response.setErrorMsg(data.getErrorMsg());
                this.syncCallback(info.getExecuteJobId(), response);
                return true;
            }
            // 其他状态不做处理
        } else {
            log.error("Query task execute response error. Response code: {}, message: {}, data: {}", resultResponse.getCode(), resultResponse.getMessage(), data);
            return false;
        }
        return false;
    }

    private boolean isTimeout(MonitorJobInfo info) {
        if (info.getTimeout() == null || info.getTimeout() <= 0L) {
            return false;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(new Date(info.getRequestTime()));
        calendar.add(Calendar.MILLISECOND, Math.toIntExact(info.getTimeout()));
        return DateUtils.compare(calendar.getTime(), DateUtils.getCurrentDate()) <= 0;
    }

}
